-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_protobuf #37972
Conversation
array type support for from_proto function.
1) Basic Message repeated once 2) Basic Message repeated twice
unit tests for protobuf repeated message
… into SPARK_PROTO_1
1) Primitive types (TBD: Decimal) 2) Deserialization (push down filters not working yet) Formatting errors to keep the IDE happy
matches with the message structure
successfully but also creates an UnkonwnFieldSet and use that to infer that there was a problem with the schema.
… into SPARK_PROTO_1
@SandishKumarHN This PR LGTM overall.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's have followup for documentation and other improvements
@SandishKumarHN @mposdev21 Thanks for the great work! |
@SandishKumarHN I am going to set you as the assignee in https://issues.apache.org/jira/browse/SPARK-40654. Do you have a jira account? |
@gengliangwang yes, SandishKumarHN or [email protected] |
@@ -1208,6 +1254,9 @@ object CopyDependencies { | |||
if (jar.getName.contains("spark-connect") && | |||
!SbtPomKeys.profiles.value.contains("noshade-connect")) { | |||
Files.copy(fid.toPath, destJar.toPath) | |||
} else if (jar.getName.contains("spark-protobuf") && | |||
!SbtPomKeys.profiles.value.contains("noshade-protobuf")) { | |||
Files.copy(fid.toPath, destJar.toPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have the variable fidProtobuf
, but you don't use it anywhere. Possibly it should be used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bersprockets thanks for pointing this out; I have made changes in the follow-up PR.
@@ -59,7 +59,7 @@ object BuildCommons { | |||
) = Seq( | |||
"core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe", | |||
"tags", "sketch", "kvstore" | |||
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connect) | |||
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connect) ++ Seq(protobuf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is not needed right? (noticed this while fixing merge conflicts for Databricks's own repo).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean protobuf is already included in sqlProjeccts
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rangadi yes looks like this change is not needed. will make change this followup PR
@@ -390,7 +390,7 @@ object SparkBuild extends PomBuild { | |||
val mimaProjects = allProjects.filterNot { x => | |||
Seq( | |||
spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, | |||
unsafe, tags, tokenProviderKafka010, sqlKafka010, connect | |||
unsafe, tags, tokenProviderKafka010, sqlKafka010, connect, protobuf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for? Avro is not included here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rangadi it is part of shading google-protobuf jar through sbt. so passing protobuf module through MimaBuild(is a tool for identifying binary incompatibilities in Scala libraries.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @SandishKumarHN. How do we trigger this? With sbt build?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rangadi there is a file to trigger,
dev/mima
@@ -1107,10 +1152,10 @@ object Unidoc { | |||
|
|||
(ScalaUnidoc / unidoc / unidocProjectFilter) := | |||
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, | |||
yarn, tags, streamingKafka010, sqlKafka010, connect), | |||
yarn, tags, streamingKafka010, sqlKafka010, connect, protobuf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: Is this required? Wasn't needed for Avro. I don't have much context here. cc: @gengliangwang, @SandishKumarHN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rangadi I was following the connector/connect module for shading jars and thought this is needed. Based on sbt-unidoc it skips Scaladoc for the protobuf project. not sure if we need this or not.
(JavaUnidoc / unidoc / unidocProjectFilter) := | ||
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, | ||
yarn, tags, streamingKafka010, sqlKafka010, connect), | ||
yarn, tags, streamingKafka010, sqlKafka010, connect, protobuf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
Added two tickets to the Epic:
|
… to_protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) This PR follows main PR #37972 The following is an example of how to use from_protobuf and to_protobuf in Pyspark. ```python data = [("1", (2, "Alice", 109200))] ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" df = spark.createDataFrame(data, ddl_schema) desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') import tempfile # Writing a protobuf description into a file, generated by using # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select(to_protobuf(df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ... proto_df = proto_df.select(from_protobuf(proto_df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| +----------------------------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ ``` ### ****Tests Covered**** - from_protobuf / to_protobuf (functions.py) Closes #38212 from SandishKumarHN/PYSPARK_PROTOBUF. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
testingTypes.foreach { dt => | ||
val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1) | ||
val filePath = testFile("protobuf/catalyst_types.desc").replace("file:/", "/") | ||
test(s"single $dt with seed $seed") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SandishKumarHN Is it a flaky test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#38286 itself is blocked by other flaky tests.
sad irony or bad karma, can't decide ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. It is widely used in Kafka-based data pipelines. Unlike Avro, Spark does not have native support for protobuf. This PR provides two new functions from_protobuf/to_protobuf to read and write Protobuf data within a data frame. The implementation is closely modeled after Avro implementation so that it is easy to understand and review the changes. Following is an example of typical usage. ```scala // `from_proto` requires absolute path of Protobuf schema file // and the protobuf message within the file val userProtoFile = "./examples/src/main/resources/user.desc" val userProtoMsg = "User" val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "proto-topic-in") .load() // 1. Decode the Protobuf data into a struct; // 2. Filter by column `favorite_color`; // 3. Encode the column `name` in Protobuf format. val output = df .select(from_protobuf('value, userProtoFile, userProtoMsg) as 'user) .where("user.favorite_color == \"red\"") .select(to_protobuf($"user.name", userProtoFile, userProtoMsg) as 'value) val query = output .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "proto-topic-out") .start() ``` The new functions are very similar to Avro - from_protobuf requires the proto descriptor file and the message type within that file which is similar to from_avro requiring the JSON schema. - to_protobuf is similar to to_avro and does not require the proto descriptor file as it can build the schema (protobuf descriptor) from the catalyst types. Similarly, to_proto (like to_avro) can also take in the descriptor file for describing the schema - Protobuf format proto3 is supported ( Even though proto2 and proto3 are inter-operable, we have explicitly tested only with proto3) - Google protobuf supported types - Scalar value types - Enumerations - Message types as field types - Nested Messages - Maps - Unknown fields are well-formed protocol buffer serialized data representing fields that the parser does not recognize. Original version of proto3 did not include this when there are parsing problems. This feature is needed to detect schemas that does not match the message type and needed to support FAIL_SAFE and PERMISSIVE mode. This feature is available in proto3 with version. 3.5 onwards - Any requires the knowledge of the underlying object type when deserializing the message and generally not considered type safe - OneOf requires the knowledge of the object type that was encoded when deserializing the message - Custom Options is an advanced feature within protobuf where the users can define their own options - Catalyst types that are not natively supported in protobuf. This happens normally during serialization and an exception will be thrown when following types are encountered - DecimalType - DateType - TimestampType Tests have been written to test at different levels - from_protobuf / to_protobuf (ProtoFunctionSuite) - ProtoToCatalyst / CatalystToProto (ProtoCatalystDataConversionSuite) - ProtoDeserializer / ProtoSerializer (ProtoSerdeSuite) A bunch of roundtrip tests go through to_protobuf(from_proto) or from_protobuf(to_proto) and compare the results. It also repeats some of the tests where to_protobuf is called without a descriptor file where the protobuf descriptor is built from the catalyst types. - roundtrip in to_protobuf and from_protobuf for struct for protobuf scalar types - roundtrip in to_protobuf(without descriptor params) and from_proto - struct for protobuf scalar types - roundtrip in from_protobuf and to_protobuf - Repeated protobuf types - roundtrip in from_protobuf and to_protobuf - Repeated Message Once - roundtrip in from_protobuf and to_protobuf - Repeated Message Twice - roundtrip in from_protobuf and to_protobuf - Map - roundtrip in from_protobuf and to_protobuf - Enum - roundtrip in from_protobuf and to_protobuf - Multiple Message - roundtrip in to_protobuf and from_protobuf - with null - Test basic conversion - serialize(deserialize(message)) == message - Fail to convert with field type mismatch - Make sure the right exception is thrown for incompatible schema for serializer and deserializer - Fail to convert with missing nested Protobuf fields - Fail to convert with deeply nested field type mismatch - Fail to convert with missing Catalyst fields - ProtoToCatalyst(to_protobuf(basic_catalyst_types )): Boolean,Integer,Double,Float,Binary,String,Byte,Shost - Handle unsupported input of Message type: Serialize a message first and deserialize using a bad schema. Test with FAILFAST to get an exception and PERMISSIVE to get a null row - filter push-down to proto deserializer: Filtering the rows based on the filter during proto deserialization - Test ProtoDeserializer with binary message type Recent(10-04-2022) changes have been tested with the configurations listed below. Job: Kafka + Spark Structure Streaming 2 executors, each with 2048m and 2 cores 150-200 events/second each event having 100 fields(basic types, message, map type, enum) Closes apache#37972 from SandishKumarHN/SPARK_PROTO_1. Lead-authored-by: SandishKumarHN <[email protected]> Co-authored-by: Sandish Kumar Hebbani Naga <[email protected]> Co-authored-by: Mohan Parthasarathy <[email protected]> Co-authored-by: sandishkumarhn <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
… to_protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) This PR follows main PR apache#37972 The following is an example of how to use from_protobuf and to_protobuf in Pyspark. ```python data = [("1", (2, "Alice", 109200))] ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" df = spark.createDataFrame(data, ddl_schema) desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') import tempfile with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select(to_protobuf(df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ... proto_df = proto_df.select(from_protobuf(proto_df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| +----------------------------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ ``` - from_protobuf / to_protobuf (functions.py) Closes apache#38212 from SandishKumarHN/PYSPARK_PROTOBUF. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…lasses This is the follow-up PR to #37972 and #38212 ### What changes were proposed in this pull request? 1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json). 2. Support protobuf imports 3. validate protobuf timestamp and duration types. ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Existing tests should cover the validation of this PR. CC: rangadi mposdev21 gengliangwang Closes #38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
StringType -> "StringMsg") | ||
|
||
testingTypes.foreach { dt => | ||
val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one fails when seed = 38, see https://github.com/MaxGekk/spark/actions/runs/3391675654/jobs/5638519849
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk This problem was solved by #38286. Was your fork in sync with master? I can open a JIRA if it fails again. I repeated the test with the above seed = 38. There were no failures.
[info] ProtobufCatalystDataConversionSuite:
[info] - single StructType(StructField(int32_type,IntegerType,true)) with seed 38 (67 milliseconds)
[info] - single StructType(StructField(double_type,DoubleType,true)) with seed 38 (22 milliseconds)
[info] - single StructType(StructField(float_type,FloatType,true)) with seed 38 (18 milliseconds)
[info] - single StructType(StructField(bytes_type,BinaryType,true)) with seed 38 (23 milliseconds)
[info] - single StructType(StructField(string_type,StringType,true)) with seed 38 (27 milliseconds)
[info] - Handle unsupported input of message type (28 milliseconds)
[info] - filter push-down to Protobuf deserializer (13 milliseconds)
[info] - ProtobufDeserializer with binary type (0 milliseconds)
[info] - Full names for message using descriptor file (1 millisecond)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happened on the recent master. The commit is 216a5d2c39ea3d9e869818692b2f4c0f2652aa56:
$ git diff
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index 271c5b0fec..080bf1eb1f 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -123,7 +123,7 @@ class ProtobufCatalystDataConversionSuite
StringType -> ("StringMsg", ""))
testingTypes.foreach { dt =>
- val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)
+ val seed = 38
test(s"single $dt with seed $seed") {
val (messageName, defaultValue) = catalystTypesToProtoMessages(dt.fields(0).dataType)
$ build/sbt "test:testOnly *ProtobufCatalystDataConversionSuite"
[info] - single StructType(StructField(double_type,DoubleType,true)) with seed 38 *** FAILED *** (10 milliseconds)
[info] java.lang.NullPointerException:
[info] at org.apache.spark.sql.protobuf.ProtobufCatalystDataConversionSuite.$anonfun$new$2(ProtobufCatalystDataConversionSuite.scala:134)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the JIRA for the bug: https://issues.apache.org/jira/browse/SPARK-41015
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk thank you will look into it.
… to_protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) This PR follows main PR apache#37972 The following is an example of how to use from_protobuf and to_protobuf in Pyspark. ```python data = [("1", (2, "Alice", 109200))] ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" df = spark.createDataFrame(data, ddl_schema) desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') import tempfile # Writing a protobuf description into a file, generated by using # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select(to_protobuf(df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ... proto_df = proto_df.select(from_protobuf(proto_df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| +----------------------------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ ``` ### ****Tests Covered**** - from_protobuf / to_protobuf (functions.py) Closes apache#38212 from SandishKumarHN/PYSPARK_PROTOBUF. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…lasses This is the follow-up PR to apache#37972 and apache#38212 ### What changes were proposed in this pull request? 1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json). 2. Support protobuf imports 3. validate protobuf timestamp and duration types. ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Existing tests should cover the validation of this PR. CC: rangadi mposdev21 gengliangwang Closes apache#38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…nition from `protobuf` ### What changes were proposed in this pull request? This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | #37972 and become unused in SPARK-41639 | #39147. ### Why are the changes needed? Clean up unused definitions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #47459 from LuciferYang/remove-ScalaReflectionLock. Authored-by: yangjie01 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…nition from `protobuf` ### What changes were proposed in this pull request? This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | apache#37972 and become unused in SPARK-41639 | apache#39147. ### Why are the changes needed? Clean up unused definitions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47459 from LuciferYang/remove-ScalaReflectionLock. Authored-by: yangjie01 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…nition from `protobuf` ### What changes were proposed in this pull request? This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | apache#37972 and become unused in SPARK-41639 | apache#39147. ### Why are the changes needed? Clean up unused definitions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47459 from LuciferYang/remove-ScalaReflectionLock. Authored-by: yangjie01 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…nition from `protobuf` ### What changes were proposed in this pull request? This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | apache#37972 and become unused in SPARK-41639 | apache#39147. ### Why are the changes needed? Clean up unused definitions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47459 from LuciferYang/remove-ScalaReflectionLock. Authored-by: yangjie01 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…nition from `protobuf` ### What changes were proposed in this pull request? This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | apache#37972 and become unused in SPARK-41639 | apache#39147. ### Why are the changes needed? Clean up unused definitions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47459 from LuciferYang/remove-ScalaReflectionLock. Authored-by: yangjie01 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
From SandishKumarHN([email protected]) and Mohan Parthasarathy([email protected])
Introduction
Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. It is widely used in Kafka-based data pipelines. Unlike Avro, Spark does not have native support for protobuf. This PR provides two new functions from_protobuf/to_protobuf to read and write Protobuf data within a data frame.
The implementation is closely modeled after Avro implementation so that it is easy to understand and review the changes.
Following is an example of typical usage.
The new functions are very similar to Avro
What is supported
What is not supported
Test cases covered
Tests have been written to test at different levels
ProtoFunctionSuite
A bunch of roundtrip tests go through to_protobuf(from_proto) or from_protobuf(to_proto) and compare the results. It also repeats some of the tests where to_protobuf is called without a descriptor file where the protobuf descriptor is built from the catalyst types.
ProtoSerdeSuite
ProtoCatalystDataConversionSuite
Cluster Testing
Recent(10-04-2022) changes have been tested with the configurations listed below.
Job: Kafka + Spark Structure Streaming
2 executors, each with 2048m and 2 cores
150-200 events/second each event having 100 fields(basic types, message, map type, enum)